package com.ken.common.event.framework.handle.defaults;

import com.ken.common.event.apply.handle.EventHandler;
import com.ken.common.event.apply.handle.annotation.EventType;
import com.ken.common.event.framework.handle.MsgHandler;
import com.ken.common.event.framework.message.EventMessage;
import com.ken.common.event.framework.processor.EventMsgPostProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.Executor;

/**
 * 架构默认的消息处理器
 */
@Slf4j
public class DefaultMsgHandler implements MsgHandler {

    @Autowired
    private List<EventHandler> eventHandlers;

    @Autowired(required = false)
    private List<EventMsgPostProcessor> postProcessors;

    /**
     * 异步处理的线程池
     */
    @Autowired
    private Executor executor;

    @Override
    public void msgHandler(String eventTypeStr, EventMessage eventMsg) {
        //循环多个事件处理器
        A:for (EventHandler eventHandler : eventHandlers) {
            //获得EventHandler实现类的注解
            EventType eventType = eventHandler.getClass().getAnnotation(EventType.class);
            //判断事件类型是否匹配
            if (!eventTypeStr.equals(eventType.value())) continue A;

            log.debug("[Msg-Handle] - 消费端接收到可消费的消息！事件类型 - {} 消息发送时间 - {}", eventTypeStr, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(eventMsg.getCreateTime()));

            //创建一个Runnable对象
            Runnable runnable = () -> {
                //获得消息的实际消息体对象
                Object msgBody = eventMsg.getMsg();
                //异常信息
                Throwable throwable = null;
                try {
                    //消息的前置处理
                    log.debug("[Msg-Handle] - 消息前置处理器开始执行...");
                    if (!CollectionUtils.isEmpty(postProcessors)) {
                        for (EventMsgPostProcessor postProcessor : postProcessors) {
                            if (postProcessor.isSupport(eventTypeStr, eventMsg, eventHandler)) {
                                if (!postProcessor.beginProcessor(eventMsg, eventHandler)) {
                                    log.debug("[Msg-Handle] - 前置处理器中断了消息的后续消费！");
                                    return;
                                }
                            }
                        }
                    }
                    log.debug("[Msg-Handle] - 消息后置处理器执行完成！");

                    //当前路由键 匹配上了 当前的实现类，该事件应该交给当前的实现类处理
                    log.debug("[Msg-Handle] - 正式执行消息消费的方法 - {}", msgBody);
                    eventHandler.eventHandler(msgBody, eventMsg);
                } catch (Exception e) {
                    throwable = e;
                    e.printStackTrace();
                }

                //消息的后置处理
                log.debug("[Msg-Handle] - 消息后置处理器开始执行...");
                if (!CollectionUtils.isEmpty(postProcessors)) {
                    for (EventMsgPostProcessor postProcessor : postProcessors) {
                        if (postProcessor.isSupport(eventTypeStr, eventMsg, eventHandler)) {
                            if (!postProcessor.afterProcessor(eventMsg, eventHandler, throwable)) {
                                log.debug("[Msg-Handle] - 后置处理器执行中断！");
                                return;
                            }
                        }
                    }
                }
                log.debug("[Msg-Handle] - 消息后置处理器执行完成！");
            };
            //根据条件是否异步执行
            if (!eventType.isAsync()) {
                //同步执行
                runnable.run();
            } else {
                //异步执行
                executor.execute(runnable);
            }
        }
    }
}
